【Kinesis Video Streams】Lambda関数でKinesis Video Streamsから動画を取得する
はじめに
現在、カフェのシステムでは、機械学習を用いて、カメラを用いて動画を撮影し、商品の前にいる人物の骨格や手を検出することで、どのユーザがどの商品を取り出したかを判定しています。
今までは、骨格検出モデルを用いてエッジデバイスで動画を推論処理(撮影した画像から映っている人物の骨格の座標を検出する処理)を実行する、という構成で処理をしていました。今後、エッジ側のデバイスの費用を下げたり、骨格検出以外の処理を増やすことを考えているため、エッジデバイスからクラウドに動画を送信し、クラウド側で様々な処理を実行する、という構成を検討しています。
前回までの記事で、エッジデバイスで動画をエンコードし、動画ファイルをクラウド(AWS Kinesis Video Streams)に送信する方法について記載しました。
目的
今回は、Kinesis Video Streams(以下、KVS)に送信された動画を処理する前段階として、Lambda関数からKVSにアクセスし動画を取得する方法を調べました。
アクセス方法
(前提として、以下では、KVSで「python_test」という名前でストリームを作成してあり、そのストリームに対して動画を送信している状態であるものとします。送信する方法については、先程のリンク先をご参照ください。)
手順
以下の手順で、AWSの環境を準備します。
手順1:OpenCVをインストールしたLambda レイヤーを作成する
- python/lib/python3.6/site-packagesとフォルダを作成し、site-packages内で「pip3 install -t . opencv-python」と実行し、ライブラリをインストールします(python3のバージョンが異なる場合は、python3.6を適宜変更してください)
- pythonがトップの階層に来るように、zipで圧縮します
- AWSのマネジメントコンソールから、Lambdaのページに移動します。左ペインの「レイヤー」を選択し、「レイヤーの作成」をクリックします。作成したzipフォルダを指定し(他は適宜入力して)レイヤーを作成します。
手順2:Lambda関数を作成する
- ランタイムは、Lambdaレイヤーを作成した環境のPythonと、同じバージョンのPythonを選びます
- 作成したら、ページ下部の「レイヤー」から、手順1で作成したLambda レイヤーを追加します
- メモリサイズを512MB上げる(OpenCVをインポートするため。128MBだとProcess exited before completing requestエラーになりました)
- タイムアウトを10秒程度に伸ばす(下のコードを実行したら6秒程度かかりました)
- 「設定」の「アクセス権限」の「実行ロール」から、IAMのページに移動します。「ポリシーをアタッチします」から、作成してあるKVSのストリームへのアクセス権限のあるポリシーを追加します。
実装コード
上記の手順で作成したLambda関数のコードとして、以下の実装しました。2つファイルがあり、トップの階層に配置しました。
- kvs_utils.py
import boto3 import cv2 import os import logging logger = logging.getLogger() logger.setLevel(logging.INFO) logger.info(boto3.__version__) kv = boto3.client("kinesisvideo") def get_endpoint_get_clip(stream_name): endpoint_get_clip = kv.get_data_endpoint( StreamName=stream_name, APIName="GET_CLIP" )["DataEndpoint"] kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint_get_clip) logger.info(endpoint_get_clip) return kvam def get_clip_from_kvam(stream_name, start_timestamp_datetime, end_timestamp_datetime): # get endpoint kvam = get_endpoint_get_clip(stream_name) # get video data response = kvam.get_clip( StreamName = stream_name, ClipFragmentSelector = { "FragmentSelectorType": "PRODUCER_TIMESTAMP", "TimestampRange" : { "StartTimestamp": start_timestamp_datetime, # UTC "EndTimestamp": end_timestamp_datetime, # UTC }, }, ) body = response["Payload"].read() logger.info(f"body: {len(body):,}") # convert data (write to file) filename_base = str(start_timestamp_datetime).replace(':', '_').replace(" ", "_").replace("-", "_") EXT = "mkv" filepath = f"/tmp/out_{filename_base}.{EXT}" with open(filepath, "wb") as file: file.write(body) return response, body, filepath
- lambda_function.py
import json import datetime import boto3 import cv2 import kvs_utils import logging logger = logging.getLogger() logger.setLevel(logging.INFO) # KVSのストリーム名 STREAM_NAME = "python_test" # 取得したい動画の開始時刻(Producer Time) YEAR, MONTH, DAY, HOUR, MIN, SEC = 2021, 5, 18, 21, 25, 0 # JST # 取得したい動画の長さ(秒) DURATION = 4 def lambda_handler(event, context): # proc arg duration = event["duration"] if "duration" in event else DURATION stream_name = event["stream_name"] if "stream_name" in event else STREAM_NAME if "producer_timestamp_start" in event: producer_timestamp_start = event["producer_timestamp_start"] datetime_start = datetime.datetime.fromtimestamp(producer_timestamp_start) else: datetime_start = datetime.datetime(YEAR, MONTH, DAY, HOUR-9, MIN, SEC) # UTC datetime_end = datetime_start + datetime.timedelta(seconds=duration) logger.info(datetime_start) logger.info(datetime_end) # get clip response, body, filepath = kvs_utils.get_clip_from_kvam( stream_name, datetime_start, datetime_end) # extract images cap = cv2.VideoCapture(filepath) n_frame = cap.get(cv2.CAP_PROP_FRAME_COUNT) logger.info(f"n_frame {n_frame}") for i_frame in range(int(n_frame)): timestamp = datetime_start.timestamp() + duration * i_frame / n_frame ret, frame = cap.read() # proc image
動画を取得してくる処理(kvs_utils.get_clip_from_kvam以降)の動作としては、次の通りです。
- KVS(kinesis-video-archived-media)から動画を取得する用のエンドポイントを取得する
- 取得対象のタイムスタンプ(始点・終点)を指定して、KVSから動画データを取得する
- 動画データを扱いやすいオブジェクトに変換するために、tmp領域にmkvファイルとして一旦書き出し、cv2.VideoCaptureで読み込み直す(他にも方法があるかもしれません)
これによって、動画を読み出すことができました。
注意点
以下では、実装中に詰まった点・注意点について記載します。
- 指定時刻(パラメータ)と、取得される動画の単位
kvs_utils.get_clip_from_kvam内で使用しているkvam.get_clipでは、引数としてTimestampRangeのStartTimestampとEndTimestampを指定します。このとき、動画はこの時刻で厳密に切り取られるのではなく、KVSに送信した動画の秒数に依存します。
具体的には、KVSに2秒ずつのファイルに分けて送信していた場合、タイムスタンプ(DURATION)として1or2秒分を指定すると2秒分が返される、3or4秒分を指定すると4秒分が返される、という結果になりました。また、DURATIONとして2秒分を指定した状態で、開始タイムスタンプを1秒ずつずらすと、2秒分が返ってくる場合と4秒分が返ってくる場合が交互になりました。
おそらく、ストリームが保持している動画のチャンク単位で取り出す形になっているようです。指定した秒数と取得される動画の長さが違う点に注意が必要です。
- KVSのエンドポイントの種類
補足ですが、ドキュメントなどを読んでいると、KVSでは、(kinesis-video-archived-mediaの)GET_CLIPの他に、(kinesis-video-media)GET_MEDIAがあります(実装としては、以下のような形になるかと思います)。この方法だと、引数としてAfterFragmentNumberというものがあるものの、直接終了時刻を指定できないようでした。
def get_endpoint_get_media(stream_name): endpoint_get_media = kv.get_data_endpoint( StreamName=stream_name, APIName="GET_MEDIA" )["DataEndpoint"] kvm = boto3.client("kinesis-video-media", endpoint_url=endpoint_get_media) logger.info(endpoint_get_media) return kvm def get_media_from_kvm(stream_name, start_timestamp_datetime): kvm = get_endpoint_get_media(stream_name) response = kvm.get_media( StreamName=stream_name, StartSelector={ 'StartSelectorType': 'PRODUCER_TIMESTAMP', # 'AfterFragmentNumber': "10", 'StartTimestamp': start_timestamp_datetime, # 'ContinuationToken': 'string' }, ) payload = response["Payload"] body = payload.read() filename_base = str(start_timestamp_datetime).replace(':', '_').replace(" ", "_").replace("-", "_") EXT = "webm" filepath = f"/tmp/out_{filename_base}.{EXT}" with open(filepath, "wb") as file: file.write(body)
まとめ
Lambda関数からKinesis Video Streamsの動画を取得する方法を調べました。エンドポイントを作成し、取得した動画をファイルに一旦書き出す方法で実装しました。パラメータで指定する時刻と実際に取得される動画の長さは少しわかりにくく、注意が必要です。